Network of Things demo

Start a Mosquitto container first. For example:

  • Use codes\_demo\1_start_broker.sh to start a Mosquitto container on Raspberry Pi.
  • Config files are in mqtt_config\mqtt.
  • set allow_anonymous true in mqtt_config\mqtt\config\mosquitto.conf to allow anonymous client.

Getting Started

What this notebook does:

  • Using a client on PC
  • List connected nodes
  • Send messages to remote nodes:
    • Return results (read GPIOs)via RPC mechanism.
    • Write data to remote nodes (write GPIOs).
    • Execute arbitrary code on remote nodes.

In [1]:
import os
import sys
import time
 
sys.path.append(os.path.abspath(os.path.join(os.path.pardir, os.path.sep.join(['..', 'codes']), 'client')))
sys.path.append(os.path.abspath(os.path.join(os.path.pardir, os.path.sep.join(['..', 'codes']), 'node')))
sys.path.append(os.path.abspath(os.path.join(os.path.pardir, os.path.sep.join(['..', 'codes']), 'shared')))
sys.path.append(os.path.abspath(os.path.join(os.path.pardir, os.path.sep.join(['..', 'codes']), 'micropython')))
 
import client
from collections import OrderedDict

Start client


In [2]:
the_client = client.Client()
the_client.start()

while not the_client.status['Is connected']:            
    time.sleep(1)
    print('Node not ready yet.')


My name is Client_366

Sending 277 bytes
Message:
OrderedDict([('command', 'set connection name'), ('correlation_id', '2018-04-06 16:59:53.038210'), ('kwargs', {'name': 'Client_366'}), ('message_id', '2018-04-06 16:59:53.038210'), ('message_type', 'command'), ('need_result', True), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


[Connected: ('123.240.210.68', 1883)]
[Listen to messages]
Node not ready yet.

Prepare messages


In [3]:
# messages _____________________________________________
messages = OrderedDict()

messages['read_GPIOs'] = {'message_type': 'command',
                          'command': 'read GPIOs',
                          'kwargs': {'pins': [5, 12, 13, 14, 15, 16]},
                          'need_result': True}

messages['blink_led'] = {'message_type': 'command',
                         'command': 'blink led',
                         'kwargs': {'times': 3, 'forever': False,
                                    'on_seconds': 0.1, 'off_seconds': 0.1}}


Data received: 255 bytes
Message:
OrderedDict([('command', 'set connection name'), ('correlation_id', '327027'), ('kwargs', {'name': 'NodeMCU_1d73c000'}), ('message_id', '327027'), ('message_type', 'command'), ('need_result', True), ('receiver', 'Hub'), ('reply_to', 'NodeMCU_1d73c000'), ('sender', 'NodeMCU_1d73c000')])


In [4]:
# remote_nodes = ['n_Lambda', 'n_Alpha', 'n_Beta']

In [5]:
# remote_node = 'n_Alpha'
# messages_ext = {}


# messages_ext['blink_led'] = {'message_type': 'command',
#                              'command': 'blink led',
#                              'kwargs': {'times': 3, 'forever': False, 'on_seconds': 0.1, 'off_seconds': 0.1}}
# the_client.request('Hub', messages_ext['blink_led']) 


# messages_ext['write_GPIOs'] = {'message_type': 'command',
#                                'command': 'write GPIOs',
#                                'kwargs': {'pins_and_values': [(2, 0), (2, 1), (2, 0),]}} 
# the_client.request(remote_node, messages_ext['write_GPIOs']) 


# messages_ext['test_eval'] = {'message_type': 'eval',
#                              'to_evaluate': '2+3',
#                              'need_result': True}                                   
# _, result = the_client.request(remote_node, messages_ext['test_eval']) 
# print('result:', result.get())


# messages_ext['test_exec'] = {'message_type': 'exec',
#                              'to_exec': 'print("Hello World!")'} 
# the_client.request(remote_node, messages_ext['test_exec']) 


# with open('script_to_deploy.py') as f:
#     script = f.read()        
# messages_ext['test_upload_script'] = {'message_type': 'script', 
#                                       'script': script} 
# the_client.request(remote_node, messages_ext['test_upload_script'])


# messages_ext['test_function'] = {'message_type': 'function',
#                                  'function': 'blink_led',
#                                  'kwargs': {'times': 3, 'forever': False, 
#                                             'on_seconds': 0.1, 'off_seconds': 0.1}}
# the_client.request(remote_node, messages_ext['test_function'])

List connected nodes


In [6]:
the_client.node.worker.roll_call()
time.sleep(2)
remote_nodes = sorted(the_client.node.worker.contacts.keys())

print('\n[____________ Connected nodes ____________]\n')
print('\nConnected nodes:\n{}\n'.format(remote_nodes))


Sending 249 bytes
Message:
OrderedDict([('correlation_id', '2018-04-06 11:17:56.449591'), ('function', 'check_in'), ('kwargs', {'caller': 'Client_366'}), ('message_id', '2018-04-06 11:17:56.449591'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 249 bytes
Message:
OrderedDict([('correlation_id', '2018-04-06 11:17:56.449591'), ('function', 'check_in'), ('kwargs', {'caller': 'Client_366'}), ('message_id', '2018-04-06 11:17:56.449591'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Sending 172 bytes

Data received: 172 bytes
Message:
OrderedDict([('function', 'register_contact'), ('kwargs', {'contact_id': 'Client_366', 'name': 'Client_366'}), ('message_type', 'function'), ('receiver', 'Client_366'), ('sender', 'Client_366')])


[____________ Connected nodes ____________]


Connected nodes:
['Client_366']


In [7]:
# remote_nodes = ['n_Lambda', 'n_Alpha', 'n_Beta']

YouTube video clip


In [8]:
for remote_node in remote_nodes:
    the_client.request(remote_node, messages['blink_led'])


Sending 300 bytes
Sending 310 bytes

Message:
OrderedDict([('command', 'blink led'), ('correlation_id', '2017-12-15 11:04:39.988897'), ('kwargs', {'times': 3, 'forever': False, 'on_seconds': 0.1, 'off_seconds': 0.1}), ('message_id', '2017-12-15 11:04:39.988897'), ('message_type', 'command'), ('receiver', 'Client_366'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])
Message:
OrderedDict([('command', 'blink led'), ('correlation_id', '2017-12-15 11:04:40.024993'), ('kwargs', {'times': 3, 'forever': False, 'on_seconds': 0.1, 'off_seconds': 0.1}), ('message_id', '2017-12-15 11:04:40.024993'), ('message_type', 'command'), ('receiver', 'NodeMCU_30aea44cbdc8'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])



Sending 306 bytes
Message:
OrderedDict([('command', 'blink led'), ('correlation_id', '2017-12-15 11:04:40.038063'), ('kwargs', {'times': 3, 'forever': False, 'on_seconds': 0.1, 'off_seconds': 0.1}), ('message_id', '2017-12-15 11:04:40.038063'), ('message_type', 'command'), ('receiver', 'NodeMCU_cdf80a00'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 300 bytes
Message:
OrderedDict([('command', 'blink led'), ('correlation_id', '2017-12-15 11:04:39.988897'), ('kwargs', {'times': 3, 'forever': False, 'on_seconds': 0.1, 'off_seconds': 0.1}), ('message_id', '2017-12-15 11:04:39.988897'), ('message_type', 'command'), ('receiver', 'Client_366'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


In [5]:
the_client.request('Hub', messages['blink_led'])


Sending 293 bytes
Message:
OrderedDict([('command', 'blink led'), ('correlation_id', '2018-04-06 17:00:32.894453'), ('kwargs', {'times': 3, 'forever': False, 'on_seconds': 0.1, 'off_seconds': 0.1}), ('message_id', '2018-04-06 17:00:32.894453'), ('message_type', 'command'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])

Out[5]:
({'sender': 'Client_366',
  'receiver': 'Hub',
  'message_type': 'command',
  'message_id': '2018-04-06 17:00:32.894453',
  'command': 'blink led',
  'kwargs': {'times': 3,
   'forever': False,
   'on_seconds': 0.1,
   'off_seconds': 0.1},
  'reply_to': 'Client_366',
  'correlation_id': '2018-04-06 17:00:32.894453'},
 None)
Data received: 293 bytes
Message:
OrderedDict([('command', 'blink led'), ('correlation_id', '2018-04-06 17:00:32.894453'), ('kwargs', {'times': 3, 'forever': False, 'on_seconds': 0.1, 'off_seconds': 0.1}), ('message_id', '2018-04-06 17:00:32.894453'), ('message_type', 'command'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])

Read one GPIO pin


In [12]:
for remote_node in remote_nodes:
    _, result = the_client.request(remote_node, messages['read_GPIOs']) 
    print('\nGPIO status for {}: {}\n'.format(remote_node, result.get()))


Sending 286 bytes
Message:
OrderedDict([('command', 'read GPIOs'), ('correlation_id', '2017-12-18 10:45:04.108723'), ('kwargs', {'pins': [5, 12, 13, 14, 15, 16]}), ('message_id', '2017-12-18 10:45:04.108723'), ('message_type', 'command'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 286 bytes
Message:
OrderedDict([('command', 'read GPIOs'), ('correlation_id', '2017-12-18 10:45:04.108723'), ('kwargs', {'pins': [5, 12, 13, 14, 15, 16]}), ('message_id', '2017-12-18 10:45:04.108723'), ('message_type', 'command'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])

Processed result:
OrderedDict([('correlation_id', '2017-12-18 10:45:04.108723'), ('message_id', '2017-12-18 10:45:04.480486'), ('message_type', 'result'), ('receiver', 'Client_366'), ('reply_to', 'Client_366'), ('result', 'Not applicable.'), ('sender', 'Client_366')])



Sending 223 bytes
Message:
OrderedDict([('correlation_id', '2017-12-18 10:45:04.108723'), ('message_id', '2017-12-18 10:45:04.480486'), ('message_type', 'result'), ('receiver', 'Client_366'), ('reply_to', 'Client_366'), ('result', 'Not applicable.'), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '2017-12-18 10:45:04.108723'), ('message_id', '2017-12-18 10:45:04.480486'), ('message_type', 'result'), ('receiver', 'Client_366'), ('reply_to', 'Client_366'), ('result', 'Not applicable.'), ('sender', 'Client_366')])

GPIO status for Client_366: Not applicable.



Sending 296 bytes
Message:
OrderedDict([('command', 'read GPIOs'), ('correlation_id', '2017-12-18 10:45:04.885329'), ('kwargs', {'pins': [5, 12, 13, 14, 15, 16]}), ('message_id', '2017-12-18 10:45:04.885329'), ('message_type', 'command'), ('need_result', True), ('receiver', 'NodeMCU_30aea44cbdc8'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 259 bytes
Message:
OrderedDict([('correlation_id', '2017-12-18 10:45:04.885329'), ('message_id', '284852'), ('message_type', 'result'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_30aea44cbdc8'), ('result', [[5, 1], [12, 0], [13, 0], [14, 1], [15, 1], [16, 0]]), ('sender', 'NodeMCU_30aea44cbdc8')])


GPIO status for NodeMCU_30aea44cbdc8: [[5, 1], [12, 0], [13, 0], [14, 1], [15, 1], [16, 0]]


Data received: 269 bytes
Message:
OrderedDict([('command', 'set connection name'), ('correlation_id', '1530303'), ('kwargs', {'name': 'NodeMCU_30aea44cbdc8'}), ('message_id', '1530303'), ('message_type', 'command'), ('need_result', True), ('receiver', 'Hub'), ('reply_to', 'NodeMCU_30aea44cbdc8'), ('sender', 'NodeMCU_30aea44cbdc8')])

[WinError 10054] 遠端主機已強制關閉一個現存的連線。
[Closed: ('123.240.19.17', 1883)]

Send out messages and get asynchonous results


In [10]:
print('\n[______________ Sending messages ______________]\n')

results = []

# send out the messages
for message in messages.values():
    for remote_node in remote_nodes:
        if remote_node != the_client.node.worker.name:
            time.sleep(0.1)  # PyCharm needs this delay.
            formatted_message, asynch_result = the_client.request(remote_node, message)
            results.append((formatted_message, asynch_result))


[______________ Sending messages ______________]


Sending 292 bytes
Message:
OrderedDict([('command', 'read GPIOs'), ('correlation_id', '2017-09-27 22:34:37.179200'), ('kwargs', {'pins': [5, 12, 13, 14, 15, 16]}), ('message_id', '2017-09-27 22:34:37.179200'), ('message_type', 'command'), ('need_result', True), ('receiver', 'NodeMCU_1d73c000'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Sending 306 bytes
Message:
OrderedDict([('command', 'blink led'), ('correlation_id', '2017-09-27 22:34:37.374800'), ('kwargs', {'times': 3, 'off_seconds': 0.1, 'on_seconds': 0.1, 'forever': False}), ('message_id', '2017-09-27 22:34:37.374800'), ('message_type', 'command'), ('receiver', 'NodeMCU_1d73c000'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 251 bytes
Message:
OrderedDict([('correlation_id', '2017-09-27 22:34:37.179200'), ('message_id', '136860'), ('message_type', 'result'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_1d73c000'), ('result', [[5, 0], [12, 1], [13, 1], [14, 1], [15, 0], [16, 0]]), ('sender', 'NodeMCU_1d73c000')])

Actually get the results


In [11]:
# collect and print results        
print('\n[_________ Wait few seconds for reply _________]\n')
for (message, result) in results:
    try:
        if message.get('need_result'):
            print('\n[Result for request]:\n___Request___:\n{0}\n___Result____:\n{1}\n'.format(message,
                                                                                               result.get() if result else None))
    except Exception as e:
        print('\n[{}]\nMessage:\n{}'.format(e, message))
        
# Wait a while
time.sleep(3)


[_________ Wait few seconds for reply _________]


[Result for request]:
___Request___:
{'message_id': '2017-09-27 22:34:37.179200', 'correlation_id': '2017-09-27 22:34:37.179200', 'command': 'read GPIOs', 'receiver': 'NodeMCU_1d73c000', 'message_type': 'command', 'need_result': True, 'kwargs': {'pins': [5, 12, 13, 14, 15, 16]}, 'sender': 'Client_366', 'reply_to': 'Client_366'}
___Result____:
[[5, 0], [12, 1], [13, 1], [14, 1], [15, 0], [16, 0]]

Stop the demo


In [11]:
# Stopping
the_client.stop()
the_client = None
print('\n[________________ Demo stopped ________________]\n')


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-11-6f9e8aed4f6a> in <module>()
      1 # Stopping
----> 2 the_client.stop()
      3 the_client = None
      4 print('\n[________________ Demo stopped ________________]\n')

AttributeError: 'NoneType' object has no attribute 'stop'

In [ ]: